home *** CD-ROM | disk | FTP | other *** search
/ Celestin Apprentice 5 / Apprentice-Release5.iso / Source Code / C / Applications / Python 1.3.3 / Python 133 SRC / Demo / threads / sync.py < prev    next >
Text File  |  1996-03-11  |  22KB  |  604 lines

  1. # Defines classes that provide synchronization objects.  Note that use of
  2. # this module requires that your Python support threads.
  3. #
  4. #    condition(lock=None)       # a POSIX-like condition-variable object
  5. #    barrier(n)                 # an n-thread barrier
  6. #    event()                    # an event object
  7. #    semaphore(n=1)             # a semaphore object, with initial count n
  8. #    mrsw()                     # a multiple-reader single-writer lock
  9. #
  10. # CONDITIONS
  11. #
  12. # A condition object is created via
  13. #   import this_module
  14. #   your_condition_object = this_module.condition(lock=None)
  15. #
  16. # As explained below, a condition object has a lock associated with it,
  17. # used in the protocol to protect condition data.  You can specify a
  18. # lock to use in the constructor, else the constructor will allocate
  19. # an anonymous lock for you.  Specifying a lock explicitly can be useful
  20. # when more than one condition keys off the same set of shared data.
  21. #
  22. # Methods:
  23. #   .acquire()
  24. #      acquire the lock associated with the condition
  25. #   .release()
  26. #      release the lock associated with the condition
  27. #   .wait()
  28. #      block the thread until such time as some other thread does a
  29. #      .signal or .broadcast on the same condition, and release the
  30. #      lock associated with the condition.  The lock associated with
  31. #      the condition MUST be in the acquired state at the time
  32. #      .wait is invoked.
  33. #   .signal()
  34. #      wake up exactly one thread (if any) that previously did a .wait
  35. #      on the condition; that thread will awaken with the lock associated
  36. #      with the condition in the acquired state.  If no threads are
  37. #      .wait'ing, this is a nop.  If more than one thread is .wait'ing on
  38. #      the condition, any of them may be awakened.
  39. #   .broadcast()
  40. #      wake up all threads (if any) that are .wait'ing on the condition;
  41. #      the threads are woken up serially, each with the lock in the
  42. #      acquired state, so should .release() as soon as possible.  If no
  43. #      threads are .wait'ing, this is a nop.
  44. #
  45. #      Note that if a thread does a .wait *while* a signal/broadcast is
  46. #      in progress, it's guaranteeed to block until a subsequent
  47. #      signal/broadcast.
  48. #
  49. #      Secret feature:  `broadcast' actually takes an integer argument,
  50. #      and will wake up exactly that many waiting threads (or the total
  51. #      number waiting, if that's less).  Use of this is dubious, though,
  52. #      and probably won't be supported if this form of condition is
  53. #      reimplemented in C.
  54. #
  55. # DIFFERENCES FROM POSIX
  56. #
  57. # + A separate mutex is not needed to guard condition data.  Instead, a
  58. #   condition object can (must) be .acquire'ed and .release'ed directly.
  59. #   This eliminates a common error in using POSIX conditions.
  60. #
  61. # + Because of implementation difficulties, a POSIX `signal' wakes up
  62. #   _at least_ one .wait'ing thread.  Race conditions make it difficult
  63. #   to stop that.  This implementation guarantees to wake up only one,
  64. #   but you probably shouldn't rely on that.
  65. #
  66. # PROTOCOL
  67. #
  68. # Condition objects are used to block threads until "some condition" is
  69. # true.  E.g., a thread may wish to wait until a producer pumps out data
  70. # for it to consume, or a server may wish to wait until someone requests
  71. # its services, or perhaps a whole bunch of threads want to wait until a
  72. # preceding pass over the data is complete.  Early models for conditions
  73. # relied on some other thread figuring out when a blocked thread's
  74. # condition was true, and made the other thread responsible both for
  75. # waking up the blocked thread and guaranteeing that it woke up with all
  76. # data in a correct state.  This proved to be very delicate in practice,
  77. # and gave conditions a bad name in some circles.
  78. #
  79. # The POSIX model addresses these problems by making a thread responsible
  80. # for ensuring that its own state is correct when it wakes, and relies
  81. # on a rigid protocol to make this easy; so long as you stick to the
  82. # protocol, POSIX conditions are easy to "get right":
  83. #
  84. #  A) The thread that's waiting for some arbitrarily-complex condition
  85. #     (ACC) to become true does:
  86. #
  87. #     condition.acquire()
  88. #     while not (code to evaluate the ACC):
  89. #           condition.wait()
  90. #           # That blocks the thread, *and* releases the lock.  When a
  91. #           # condition.signal() happens, it will wake up some thread that
  92. #           # did a .wait, *and* acquire the lock again before .wait
  93. #           # returns.
  94. #           #
  95. #           # Because the lock is acquired at this point, the state used
  96. #           # in evaluating the ACC is frozen, so it's safe to go back &
  97. #           # reevaluate the ACC.
  98. #
  99. #     # At this point, ACC is true, and the thread has the condition
  100. #     # locked.
  101. #     # So code here can safely muck with the shared state that
  102. #     # went into evaluating the ACC -- if it wants to.
  103. #     # When done mucking with the shared state, do
  104. #     condition.release()
  105. #
  106. #  B) Threads that are mucking with shared state that may affect the
  107. #     ACC do:
  108. #
  109. #     condition.acquire()
  110. #     # muck with shared state
  111. #     condition.release()
  112. #     if it's possible that ACC is true now:
  113. #         condition.signal() # or .broadcast()
  114. #
  115. #     Note:  You may prefer to put the "if" clause before the release().
  116. #     That's fine, but do note that anyone waiting on the signal will
  117. #     stay blocked until the release() is done (since acquiring the
  118. #     condition is part of what .wait() does before it returns).
  119. #
  120. # TRICK OF THE TRADE
  121. #
  122. # With simpler forms of conditions, it can be impossible to know when
  123. # a thread that's supposed to do a .wait has actually done it.  But
  124. # because this form of condition releases a lock as _part_ of doing a
  125. # wait, the state of that lock can be used to guarantee it.
  126. #
  127. # E.g., suppose thread A spawns thread B and later wants to wait for B to
  128. # complete:
  129. #
  130. # In A:                             In B:
  131. #
  132. # B_done = condition()              ... do work ...
  133. # B_done.acquire()                  B_done.acquire(); B_done.release()
  134. # spawn B                           B_done.signal()
  135. # ... some time later ...           ... and B exits ...
  136. # B_done.wait()
  137. #
  138. # Because B_done was in the acquire'd state at the time B was spawned,
  139. # B's attempt to acquire B_done can't succeed until A has done its
  140. # B_done.wait() (which releases B_done).  So B's B_done.signal() is
  141. # guaranteed to be seen by the .wait().  Without the lock trick, B
  142. # may signal before A .waits, and then A would wait forever.
  143. #
  144. # BARRIERS
  145. #
  146. # A barrier object is created via
  147. #   import this_module
  148. #   your_barrier = this_module.barrier(num_threads)
  149. #
  150. # Methods:
  151. #   .enter()
  152. #      the thread blocks until num_threads threads in all have done
  153. #      .enter().  Then the num_threads threads that .enter'ed resume,
  154. #      and the barrier resets to capture the next num_threads threads
  155. #      that .enter it.
  156. #
  157. # EVENTS
  158. #
  159. # An event object is created via
  160. #   import this_module
  161. #   your_event = this_module.event()
  162. #
  163. # An event has two states, `posted' and `cleared'.  An event is
  164. # created in the cleared state.
  165. #
  166. # Methods:
  167. #
  168. #   .post()
  169. #      Put the event in the posted state, and resume all threads
  170. #      .wait'ing on the event (if any).
  171. #
  172. #   .clear()
  173. #      Put the event in the cleared state.
  174. #
  175. #   .is_posted()
  176. #      Returns 0 if the event is in the cleared state, or 1 if the event
  177. #      is in the posted state.
  178. #
  179. #   .wait()
  180. #      If the event is in the posted state, returns immediately.
  181. #      If the event is in the cleared state, blocks the calling thread
  182. #      until the event is .post'ed by another thread.
  183. #
  184. # Note that an event, once posted, remains posted until explicitly
  185. # cleared.  Relative to conditions, this is both the strength & weakness
  186. # of events.  It's a strength because the .post'ing thread doesn't have to
  187. # worry about whether the threads it's trying to communicate with have
  188. # already done a .wait (a condition .signal is seen only by threads that
  189. # do a .wait _prior_ to the .signal; a .signal does not persist).  But
  190. # it's a weakness because .clear'ing an event is error-prone:  it's easy
  191. # to mistakenly .clear an event before all the threads you intended to
  192. # see the event get around to .wait'ing on it.  But so long as you don't
  193. # need to .clear an event, events are easy to use safely.
  194. #
  195. # SEMAPHORES
  196. #
  197. # A semaphore object is created via
  198. #   import this_module
  199. #   your_semaphore = this_module.semaphore(count=1)
  200. #
  201. # A semaphore has an integer count associated with it.  The initial value
  202. # of the count is specified by the optional argument (which defaults to
  203. # 1) passed to the semaphore constructor.
  204. #
  205. # Methods:
  206. #
  207. #   .p()
  208. #      If the semaphore's count is greater than 0, decrements the count
  209. #      by 1 and returns.
  210. #      Else if the semaphore's count is 0, blocks the calling thread
  211. #      until a subsequent .v() increases the count.  When that happens,
  212. #      the count will be decremented by 1 and the calling thread resumed.
  213. #
  214. #   .v()
  215. #      Increments the semaphore's count by 1, and wakes up a thread (if
  216. #      any) blocked by a .p().  It's an (detected) error for a .v() to
  217. #      increase the semaphore's count to a value larger than the initial
  218. #      count.
  219. #
  220. # MULTIPLE-READER SINGLE-WRITER LOCKS
  221. #
  222. # A mrsw lock is created via
  223. #   import this_module
  224. #   your_mrsw_lock = this_module.mrsw()
  225. #
  226. # This kind of lock is often useful with complex shared data structures.
  227. # The object lets any number of "readers" proceed, so long as no thread
  228. # wishes to "write".  When a (one or more) thread declares its intention
  229. # to "write" (e.g., to update a shared structure), all current readers
  230. # are allowed to finish, and then a writer gets exclusive access; all
  231. # other readers & writers are blocked until the current writer completes.
  232. # Finally, if some thread is waiting to write and another is waiting to
  233. # read, the writer takes precedence.
  234. #
  235. # Methods:
  236. #
  237. #   .read_in()
  238. #      If no thread is writing or waiting to write, returns immediately.
  239. #      Else blocks until no thread is writing or waiting to write.  So
  240. #      long as some thread has completed a .read_in but not a .read_out,
  241. #      writers are blocked.
  242. #
  243. #   .read_out()
  244. #      Use sometime after a .read_in to declare that the thread is done
  245. #      reading.  When all threads complete reading, a writer can proceed.
  246. #
  247. #   .write_in()
  248. #      If no thread is writing (has completed a .write_in, but hasn't yet
  249. #      done a .write_out) or reading (similarly), returns immediately.
  250. #      Else blocks the calling thread, and threads waiting to read, until
  251. #      the current writer completes writing or all the current readers
  252. #      complete reading; if then more than one thread is waiting to
  253. #      write, one of them is allowed to proceed, but which one is not
  254. #      specified.
  255. #
  256. #   .write_out()
  257. #      Use sometime after a .write_in to declare that the thread is done
  258. #      writing.  Then if some other thread is waiting to write, it's
  259. #      allowed to proceed.  Else all threads (if any) waiting to read are
  260. #      allowed to proceed.
  261. #
  262. #   .write_to_read()
  263. #      Use instead of a .write_in to declare that the thread is done
  264. #      writing but wants to continue reading without other writers
  265. #      intervening.  If there are other threads waiting to write, they
  266. #      are allowed to proceed only if the current thread calls
  267. #      .read_out; threads waiting to read are only allowed to proceed
  268. #      if there are are no threads waiting to write.  (This is a
  269. #      weakness of the interface!)
  270.  
  271. import thread
  272.  
  273. class condition:
  274.     def __init__(self, lock=None):
  275.         # the lock actually used by .acquire() and .release()
  276.         if lock is None:
  277.             self.mutex = thread.allocate_lock()
  278.         else:
  279.             if hasattr(lock, 'acquire') and \
  280.                hasattr(lock, 'release'):
  281.                 self.mutex = lock
  282.             else:
  283.                 raise TypeError, 'condition constructor requires ' \
  284.                                  'a lock argument'
  285.  
  286.         # lock used to block threads until a signal
  287.         self.checkout = thread.allocate_lock()
  288.         self.checkout.acquire()
  289.  
  290.         # internal critical-section lock, & the data it protects
  291.         self.idlock = thread.allocate_lock()
  292.         self.id = 0
  293.         self.waiting = 0  # num waiters subject to current release
  294.         self.pending = 0  # num waiters awaiting next signal
  295.         self.torelease = 0      # num waiters to release
  296.         self.releasing = 0      # 1 iff release is in progress
  297.  
  298.     def acquire(self):
  299.         self.mutex.acquire()
  300.  
  301.     def release(self):
  302.         self.mutex.release()
  303.  
  304.     def wait(self):
  305.         mutex, checkout, idlock = self.mutex, self.checkout, self.idlock
  306.         if not mutex.locked():
  307.             raise ValueError, \
  308.                   "condition must be .acquire'd when .wait() invoked"
  309.  
  310.         idlock.acquire()
  311.         myid = self.id
  312.         self.pending = self.pending + 1
  313.         idlock.release()
  314.  
  315.         mutex.release()
  316.  
  317.         while 1:
  318.             checkout.acquire(); idlock.acquire()
  319.             if myid < self.id:
  320.                 break
  321.             checkout.release(); idlock.release()
  322.  
  323.         self.waiting = self.waiting - 1
  324.         self.torelease = self.torelease - 1
  325.         if self.torelease:
  326.             checkout.release()
  327.         else:
  328.             self.releasing = 0
  329.             if self.waiting == self.pending == 0:
  330.                 self.id = 0
  331.         idlock.release()
  332.         mutex.acquire()
  333.  
  334.     def signal(self):
  335.         self.broadcast(1)
  336.  
  337.     def broadcast(self, num = -1):
  338.         if num < -1:
  339.             raise ValueError, '.broadcast called with num ' + `num`
  340.         if num == 0:
  341.             return
  342.         self.idlock.acquire()
  343.         if self.pending:
  344.             self.waiting = self.waiting + self.pending
  345.             self.pending = 0
  346.             self.id = self.id + 1
  347.         if num == -1:
  348.             self.torelease = self.waiting
  349.         else:
  350.             self.torelease = min( self.waiting,
  351.                                   self.torelease + num )
  352.         if self.torelease and not self.releasing:
  353.             self.releasing = 1
  354.             self.checkout.release()
  355.         self.idlock.release()
  356.  
  357. class barrier:
  358.     def __init__(self, n):
  359.         self.n = n
  360.         self.togo = n
  361.         self.full = condition()
  362.  
  363.     def enter(self):
  364.         full = self.full
  365.         full.acquire()
  366.         self.togo = self.togo - 1
  367.         if self.togo:
  368.             full.wait()
  369.         else:
  370.             self.togo = self.n
  371.             full.broadcast()
  372.         full.release()
  373.  
  374. class event:
  375.     def __init__(self):
  376.         self.state  = 0
  377.         self.posted = condition()
  378.  
  379.     def post(self):
  380.         self.posted.acquire()
  381.         self.state = 1
  382.         self.posted.broadcast()
  383.         self.posted.release()
  384.  
  385.     def clear(self):
  386.         self.posted.acquire()
  387.         self.state = 0
  388.         self.posted.release()
  389.  
  390.     def is_posted(self):
  391.         self.posted.acquire()
  392.         answer = self.state
  393.         self.posted.release()
  394.         return answer
  395.  
  396.     def wait(self):
  397.         self.posted.acquire()
  398.         if not self.state:
  399.             self.posted.wait()
  400.         self.posted.release()
  401.  
  402. class semaphore:
  403.     def __init__(self, count=1):
  404.         if count <= 0:
  405.             raise ValueError, 'semaphore count %d; must be >= 1' % count
  406.         self.count = count
  407.         self.maxcount = count
  408.         self.nonzero = condition()
  409.  
  410.     def p(self):
  411.         self.nonzero.acquire()
  412.         while self.count == 0:
  413.             self.nonzero.wait()
  414.         self.count = self.count - 1
  415.         self.nonzero.release()
  416.  
  417.     def v(self):
  418.         self.nonzero.acquire()
  419.         if self.count == self.maxcount:
  420.             raise ValueError, '.v() tried to raise semaphore count above ' \
  421.                   'initial value ' + `maxcount`
  422.         self.count = self.count + 1
  423.         self.nonzero.signal()
  424.         self.nonzero.release()
  425.  
  426. class mrsw:
  427.     def __init__(self):
  428.         # critical-section lock & the data it protects
  429.         self.rwOK = thread.allocate_lock()
  430.         self.nr = 0  # number readers actively reading (not just waiting)
  431.         self.nw = 0  # number writers either waiting to write or writing
  432.         self.writing = 0  # 1 iff some thread is writing
  433.  
  434.         # conditions
  435.         self.readOK  = condition(self.rwOK)  # OK to unblock readers
  436.         self.writeOK = condition(self.rwOK)  # OK to unblock writers
  437.  
  438.     def read_in(self):
  439.         self.rwOK.acquire()
  440.         while self.nw:
  441.             self.readOK.wait()
  442.         self.nr = self.nr + 1
  443.         self.rwOK.release()
  444.  
  445.     def read_out(self):
  446.         self.rwOK.acquire()
  447.         if self.nr <= 0:
  448.             raise ValueError, \
  449.                   '.read_out() invoked without an active reader'
  450.         self.nr = self.nr - 1
  451.         if self.nr == 0:
  452.             self.writeOK.signal()
  453.         self.rwOK.release()
  454.  
  455.     def write_in(self):
  456.         self.rwOK.acquire()
  457.         self.nw = self.nw + 1
  458.         while self.writing or self.nr:
  459.             self.writeOK.wait()
  460.         self.writing = 1
  461.         self.rwOK.release()
  462.  
  463.     def write_out(self):
  464.         self.rwOK.acquire()
  465.         if not self.writing:
  466.             raise ValueError, \
  467.                   '.write_out() invoked without an active writer'
  468.         self.writing = 0
  469.         self.nw = self.nw - 1
  470.         if self.nw:
  471.             self.writeOK.signal()
  472.         else:
  473.             self.readOK.broadcast()
  474.         self.rwOK.release()
  475.  
  476.     def write_to_read(self):
  477.     self.rwOK.acquire()
  478.     if not self.writing:
  479.         raise ValueError, \
  480.           '.write_to_read() invoked without an active writer'
  481.     self.writing = 0
  482.     self.nw = self.nw - 1
  483.     self.nr = self.nr + 1
  484.     if not self.nw:
  485.         self.readOK.broadcast()
  486.     self.rwOK.release()
  487.  
  488. # The rest of the file is a test case, that runs a number of parallelized
  489. # quicksorts in parallel.  If it works, you'll get about 600 lines of
  490. # tracing output, with a line like
  491. #     test passed! 209 threads created in all
  492. # as the last line.  The content and order of preceding lines will
  493. # vary across runs.
  494.  
  495. def _new_thread(func, *args):
  496.     global TID
  497.     tid.acquire(); id = TID = TID+1; tid.release()
  498.     io.acquire(); alive.append(id); \
  499.                   print 'starting thread', id, '--', len(alive), 'alive'; \
  500.                   io.release()
  501.     thread.start_new_thread( func, (id,) + args )
  502.  
  503. def _qsort(tid, a, l, r, finished):
  504.     # sort a[l:r]; post finished when done
  505.     io.acquire(); print 'thread', tid, 'qsort', l, r; io.release()
  506.     if r-l > 1:
  507.         pivot = a[l]
  508.         j = l+1   # make a[l:j] <= pivot, and a[j:r] > pivot
  509.         for i in range(j, r):
  510.             if a[i] <= pivot:
  511.                 a[j], a[i] = a[i], a[j]
  512.                 j = j + 1
  513.         a[l], a[j-1] = a[j-1], pivot
  514.  
  515.         l_subarray_sorted = event()
  516.         r_subarray_sorted = event()
  517.         _new_thread(_qsort, a, l, j-1, l_subarray_sorted)
  518.         _new_thread(_qsort, a, j, r,   r_subarray_sorted)
  519.         l_subarray_sorted.wait()
  520.         r_subarray_sorted.wait()
  521.  
  522.     io.acquire(); print 'thread', tid, 'qsort done'; \
  523.                   alive.remove(tid); io.release()
  524.     finished.post()
  525.  
  526. def _randarray(tid, a, finished):
  527.     io.acquire(); print 'thread', tid, 'randomizing array'; \
  528.                   io.release()
  529.     for i in range(1, len(a)):
  530.         wh.acquire(); j = randint(0,i); wh.release()
  531.         a[i], a[j] = a[j], a[i]
  532.     io.acquire(); print 'thread', tid, 'randomizing done'; \
  533.                   alive.remove(tid); io.release()
  534.     finished.post()
  535.  
  536. def _check_sort(a):
  537.     if a != range(len(a)):
  538.         raise ValueError, ('a not sorted', a)
  539.  
  540. def _run_one_sort(tid, a, bar, done):
  541.     # randomize a, and quicksort it
  542.     # for variety, all the threads running this enter a barrier
  543.     # at the end, and post `done' after the barrier exits
  544.     io.acquire(); print 'thread', tid, 'randomizing', a; \
  545.                   io.release()
  546.     finished = event()
  547.     _new_thread(_randarray, a, finished)
  548.     finished.wait()
  549.  
  550.     io.acquire(); print 'thread', tid, 'sorting', a; io.release()
  551.     finished.clear()
  552.     _new_thread(_qsort, a, 0, len(a), finished)
  553.     finished.wait()
  554.     _check_sort(a)
  555.  
  556.     io.acquire(); print 'thread', tid, 'entering barrier'; \
  557.                   io.release()
  558.     bar.enter()
  559.     io.acquire(); print 'thread', tid, 'leaving barrier'; \
  560.                   io.release()
  561.     io.acquire(); alive.remove(tid); io.release()
  562.     bar.enter() # make sure they've all removed themselves from alive
  563.                 ##  before 'done' is posted
  564.     bar.enter() # just to be cruel
  565.     done.post()
  566.  
  567. def test():
  568.     global TID, tid, io, wh, randint, alive
  569.     import whrandom
  570.     randint = whrandom.randint
  571.  
  572.     TID = 0                             # thread ID (1, 2, ...)
  573.     tid = thread.allocate_lock()        # for changing TID
  574.     io  = thread.allocate_lock()        # for printing, and 'alive'
  575.     wh  = thread.allocate_lock()        # for calls to whrandom
  576.     alive = []                          # IDs of active threads
  577.  
  578.     NSORTS = 5
  579.     arrays = []
  580.     for i in range(NSORTS):
  581.         arrays.append( range( (i+1)*10 ) )
  582.  
  583.     bar = barrier(NSORTS)
  584.     finished = event()
  585.     for i in range(NSORTS):
  586.         _new_thread(_run_one_sort, arrays[i], bar, finished)
  587.     finished.wait()
  588.  
  589.     print 'all threads done, and checking results ...'
  590.     if alive:
  591.         raise ValueError, ('threads still alive at end', alive)
  592.     for i in range(NSORTS):
  593.         a = arrays[i]
  594.         if len(a) != (i+1)*10:
  595.             raise ValueError, ('length of array', i, 'screwed up')
  596.         _check_sort(a)
  597.  
  598.     print 'test passed!', TID, 'threads created in all'
  599.  
  600. if __name__ == '__main__':
  601.     test()
  602.  
  603. # end of module
  604.